在這篇文章中,我們將探討如何在AWS Lambda上運行一個基於Node.js的 LINE Bot Handler,並解析其技術原理與最佳實踐。首先,我們將概述AWS Lambda的執行生命周期,並介紹官方定義的Lambda Function格式。隨後,我們會詳細檢視事件處理邏輯及Kafka Producer的程式結構。
以下Lambda Function用於處理 LINE bot Webhook,並將提取的訊息內容
和回覆訊息所需的Token
傳送到 Kafka。
一個 Lambda Function 必須遵循 AWS 官方規範的檔案和函數名稱。根據使用的程式語言,這些規範會有所不同。例如,對於Java 17,程式碼必須包含一個類別HandlerIntegerJava17
,並在其中定義 handleRequest
函數。
在本專案中,我們使用的是 Node.js,因此必須有一個名為index.js
或index.mjs
的檔案,並在其中定義一個handler
function。其基本結構如下:
export const handler = async (event, context) => {
// 取得AWS Lambda event內容。
// event 是一個 JSON 字串,可以從中解析出 LINE Platform 傳送來的資訊。
const bodyContent = JSON.parse(body);
return context.logStreamName;
};
.env
.eslintrc.json
deploy.sh
env.example
index.js
package.json
utils/kafka/producer.js
@line/bot-sdk
: 用於直接回覆LINE使用者的訊息。aws-xray-sdk
: 此為將Node.js的程式部署到AWS Lambda上執行的必要依賴dotenv
: 用於加載環境變數。kafkajs
: 用於將使用者的訊息及Token寫入Kafka Message Queue,供其他程式做進一步的處理。eslint
: 用於程式碼格式檢查。eslint-config-airbnb-base
: 本專案使用Airbnb 的 ESLint規則。eslint-plugin-import
: 用於支持 ES6 模塊的 ESLint 插件。在 .env
文件中包含了以下環境變數:
# 要連接的Kafka Host IP 或 Domain Name
KAFKA_HOST_IP=
# 要將訊息送到哪一個Kafka Topic
KAFKA_TOPIC=TEST-MESSAGE-TOPIC
# 要連接的Kafka Port, 預設為9092
KAFKA_PORT=
# 要連接的Client id
KAFKA_CLIENT_ID=
# 指定LINE BOT 的Channel Access Token
CHANNEL_ACCESS_TOKEN=
使用上一篇的deploy.sh
腳本進行部署:
function=LinebotHandlerNode ./deploy.sh
index.js
event.body
的內容就是LINE Platform送過來的Webhook。const { produceMessage } = require('./utils/kafka/producer');
require('dotenv').config();
const kafkaHandler = async (event, context) => {
const kafkaTopic = process.env.KAFKA_TOPIC;
const { body } = event;
const bodyContent = JSON.parse(body);
const { events } = bodyContent;
const produceKafkaResults = [];
events.forEach((e) => {
if (e.type === 'message' && e.message !== undefined && e.message.type === 'text') {
const {
source: {
userId,
},
message: {
text,
},
timestamp,
replyToken,
} = e;
produceKafkaResults.push(produceMessage({
topic: kafkaTopic,
messages: [{ key: userId, value: text }],
}));
}
});
await Promise.all(produceKafkaResults);
return context.logStreamName;
};
// 簡單的handler Factory,當需要開發多個Handler function需要切換時,較為方便
const handlerFactory = (name) => {
const functionCode = {
kafka: kafkaHandler,
other: otherHandler,
};
return functionCode[name];
};
// 將Lambda Handler與主要邏輯分開,方便測試及抽換
exports.handler = async function (event, context) {
const handler = handlerFactory('kafka');
await handler(event, context);
};
utils/kafka/producer.js
produceMessage
: 這邊先從環境變數取得需要的Kafka Host, Port, 以及要指定的Topic等連線資訊。最後將指定的訊息送到對應的Topic。const {
Kafka, logLevel, CompressionTypes, Partitioners,
} = require('kafkajs');
require('dotenv').config();
const host = process.env.KAFKA_HOST_IP || 'localhost';
const port = process.env.KAFKA_PORT || 9092;
const clientId = process.env.KAFKA_CLIENT_ID || 'example-producer';
const kafka = new Kafka({
logLevel: logLevel.DEBUG,
brokers: [`${host}:${port}`],
clientId,
});
const producer = kafka.producer({
allowAutoTopicCreation: true,
createPartitioner: Partitioners.LegacyPartitioner,
});
const produceMessage = async ({ topic = 'NON-GIVEN-MESSAGE', messages = [] } = {}) => {
try {
await producer.connect();
await producer.send({
topic,
compression: CompressionTypes.GZIP,
messages,
});
} catch (e) {
console.error(`[example/producer] ${e.message}`, e);
} finally {
await producer.disconnect();
}
};
module.exports = {
produceMessage,
};
以下說明幾個官方提到的AWS Lambda最佳實踐:
console.log
來記錄的日誌,將自動發送到 Amazon CloudWatch Logs,可以在AWS Lambda 主控台找到這個 Lambda Function所在的Log:以上介紹的 Node.js Lambda Function 能夠解析從 LINE Platform 傳送過來的 Webhook,並從中提取使用者的訊息內容、Token 及其他相關資訊。
由於我們希望整個系統由各司其職的小型服務組成,在這裡,我們不直接對取得的資訊做後續處理,取而代之的是,透過Kafka這個Message Queue,將訊息傳遞給後端的 Bot Server,交給它來做後續的語意判斷及資料儲存等工作。
Kafka作為多個小型服務彼此溝通的橋樑,其運作原理以及如何設定,我們將在後面的章節詳細介紹。
https://docs.aws.amazon.com/lambda/latest/operatorguide/execution-environments.html
https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html#nodejs-best-practices